//  Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

#include "db/db_impl.h"
#include "db/builder.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/thread_status_util.h"
#include "util/sst_file_manager_impl.h"

namespace rocksdb {

const size_t kExpectedFileSize = 1 << 30;

/**
 * wangzekun01@inspur.com:
 *
 * @brief 获取某一层的SST的边界值
 * @param cfd 指定列族
 *        level 指定层数
 * */

std::vector<Slice> GetLevelsBoundaries(ColumnFamilyData *cfd, int level) {
  const auto &l2_files = cfd->current()->storage_info()->LevelFiles(level);
  std::vector<Slice> boundary_keys;

  auto compare_boundary = [cfd](const Slice &lhs, const Slice &rhs)
      -> bool { return cfd->internal_comparator().user_comparator()->Compare(lhs, rhs) < 0; };
  auto equal_boundary = [cfd](const Slice &lhs, const Slice &rhs)
      -> bool { return cfd->internal_comparator().user_comparator()->Compare(lhs, rhs) == 0; };

  for (const auto &file : l2_files) {
    boundary_keys.emplace_back(file->smallest.user_key());
    boundary_keys.emplace_back(file->largest.user_key());
  }

  if (!boundary_keys.empty()) {
    // level2的边界排序去重
    std::sort(boundary_keys.begin(), boundary_keys.end(), compare_boundary);
    auto end_unique = std::unique(boundary_keys.begin(), boundary_keys.end(), equal_boundary);
    boundary_keys.erase(end_unique, boundary_keys.end());
  }

  return boundary_keys;
}

/**
 * wangzekun01@inspur.com：
 * 创建Level1 SST文件
 *
 * @details 文件超过阈值、遇到level2的边界，文件会被切分
 * @param cfd 需要操作的列族
 *        thread_pri 通常是 Env::HIGH
 *        expected_file_size  期望文件的大致大小(仅kv数据量)
 *        boundary_keys 获取level2层的边界值，用于切分文件，此对象可由GetLevelsBoundaries生成
 *        metas 返回文件元信息
 *
 * */
Status BuildLevel1Tables(
    const std::string &dbname, Env *env, const EnvOptions &env_options,
    std::shared_ptr<InternalIterator> iter,
    std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iterator,
    std::vector<SequenceNumber> snapshots, SequenceNumber earliest_write_conflict_snapshot,
    SnapshotChecker *snapshot_checker, EventLogger *event_logger, int job_id,
    Env::WriteLifeTimeHint write_hint,
    size_t expected_file_size, VersionSet *version_set, ColumnFamilyData *cfd,
    VersionEdit &edit, std::vector<FileMetaData> &metas,
    const std::vector<Slice> &boundary_keys) {

  const ImmutableCFOptions &ioptions = *cfd->ioptions();
  const MutableCFOptions &mutable_cf_options = *cfd->GetLatestMutableCFOptions();
  const InternalKeyComparator &internal_comparator = cfd->internal_comparator();
  const std::string &column_family_name = cfd->GetName();
  InternalStats *internal_stats = cfd->internal_stats();

  // 每flush超过此大小，报告一次IO统计信息
  const size_t kReportFlushIOStatsEvery = 1048576;
  Status s;
  iter->SeekToFirst();

  // 创建范围删除聚合器, snapshots一系列快照号来自db_impl
  std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
      new CompactionRangeDelAggregator(&internal_comparator, snapshots));
  range_del_agg->AddTombstones(std::move(range_del_iterator));
  // MergeHelper和CompactionIterator 过滤掉重复PUT、应该被删除的数据。
  MergeHelper merge(env, internal_comparator.user_comparator(),
                    ioptions.merge_operator, nullptr, ioptions.info_log,
                    true /* internal key corruption is not ok */,
                    snapshots.empty() ? 0 : snapshots.back(),
                    snapshot_checker);
  CompactionIterator compaction_iter(
      iter.get(), internal_comparator.user_comparator(), &merge, kMaxSequenceNumber,
      &snapshots, earliest_write_conflict_snapshot, snapshot_checker, env,
      ShouldReportDetailedTime(env, ioptions.statistics),
      true /* internal key corruption is not ok */, range_del_agg.get());

  compaction_iter.SeekToFirst();
  auto range_del_iter = range_del_agg->NewIterator();
  range_del_iter->SeekToFirst();
  Slice compaction_iter_key;

  RangeTombstone cut_tombstone_rhs(Slice(), Slice(), 0);
  bool is_cut = false;

  // 获取在compaction范围中第一个level2的边界key
  auto boundary_iter = boundary_keys.begin();
  if (iter->Valid()) {
    Slice compaction_key = compaction_iter.user_key();
    for (; boundary_iter != boundary_keys.end(); ++boundary_iter) {
      Slice boundary_key = *boundary_iter;
      if (cfd->internal_comparator().user_comparator()->Compare(compaction_key, boundary_key) < 0) {
        break;
      }
    }
  }


  // 如果有数据的话
  if (iter->Valid() || !range_del_agg->IsEmpty()) {
    // 创建多个sst文件
    while (compaction_iter.Valid() || range_del_iter->Valid()) {
      FileMetaData sst_meta;
      sst_meta.fd = FileDescriptor(version_set->NewFileNumber(), 0, 0);
      sst_meta.fd.file_size = 0;
      // 路径及文件名
      std::string fname = TableFileName(ioptions.cf_paths, sst_meta.fd.GetNumber(),
                                        sst_meta.fd.GetPathId());
#ifndef ROCKSDB_LITE
      EventHelpers::NotifyTableFileCreationStarted(
          ioptions.listeners, dbname, column_family_name, fname, job_id, TableFileCreationReason::kFlush);
#endif  // !ROCKSDB_LITE
      TableProperties tp;
      TableBuilder *builder;
      std::unique_ptr<WritableFileWriter> file_writer;
      // 目前，我们只在压缩到最底层时启用字典压缩。
      CompressionOptions compression_opts_for_flush(ioptions.compression_opts);
      compression_opts_for_flush.max_dict_bytes = 0;
      compression_opts_for_flush.zstd_max_train_bytes = 0;
      // 这个作用域是用来创建sst文件
      {
        std::unique_ptr<WritableFile> file;
#ifndef NDEBUG
        bool use_direct_writes = env_options.use_direct_writes;
        TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes);
#endif  // !NDEBUG
        s = NewWritableFile(env, fname, &file, env_options);
        if (!s.ok()) {
          EventHelpers::LogAndNotifyTableFileCreationFinished(
              event_logger, ioptions.listeners, dbname, column_family_name, fname,
              job_id, sst_meta.fd, tp, TableFileCreationReason::kFlush, s);
          return s;
        }
        file->SetIOPriority(Env::IO_HIGH);
        file->SetWriteLifeTimeHint(write_hint);

        file_writer.reset(
            new WritableFileWriter(std::move(file), fname, env_options, env,
                                   ioptions.statistics, ioptions.listeners));
        int64_t _current_time = 0;
        auto status = env->GetCurrentTime(&_current_time);
        const auto current_time = static_cast<uint64_t>(_current_time);
        builder = NewTableBuilder(
            ioptions, mutable_cf_options, internal_comparator,
            cfd->int_tbl_prop_collector_factories(), cfd->GetID(),
            column_family_name, file_writer.get(),
            GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
            mutable_cf_options.sample_for_compression, compression_opts_for_flush, 1,
            false /* skip_filters */, current_time,
            cfd->memoryarena()->GetOldestKeyTime(),
            0 /*target_file_size*/, current_time);
      }

      //一个SST文件的数据插入过程
      {
        size_t data_sum = 0;

        // 是否被level2切分
        bool is_level2_boundary_cut = false;
        // 往SST中插入经过过滤的kv数据
        for (; compaction_iter.Valid(); compaction_iter.Next()) {
          const Slice &key = compaction_iter.key();
          const Slice &value = compaction_iter.value();
          const Slice &user_key = compaction_iter.user_key();
          if (boundary_iter != boundary_keys.end()) {
            const Slice &boundary_user_key = *boundary_iter;
            if (cfd->internal_comparator().user_comparator()->Compare(boundary_user_key, user_key) < 0) {
              is_level2_boundary_cut = true;
              break;
            }
          }
          builder->Add(key, value);
          sst_meta.UpdateBoundaries(key, compaction_iter.ikey().sequence);
          data_sum += key.size() + value.size();
          if (IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
            ThreadStatusUtil::SetThreadOperationProperty(
                ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
          }
          if (data_sum > expected_file_size) {
            break;
          }
        }
        // compaction 数据被切分，如果是因为level2被切分，compaction iterator不移动，
        // 且需要移动boundary。如果是因为SST文件超过阈值，而被切分，只需要移动compaction iterator。
        bool is_compaction_iter_valid = false;
        if (compaction_iter.Valid()) {
          compaction_iter_key = compaction_iter.user_key();
          if (is_level2_boundary_cut) {
            Slice boundary_cut_key = *boundary_iter;
            while (boundary_iter != boundary_keys.end() && cfd->internal_comparator().user_comparator()
                ->Compare(boundary_cut_key, compaction_iter_key) < 0) {
              ++boundary_iter;
              boundary_cut_key = *boundary_iter;
            }
            is_level2_boundary_cut = false;
          } else {
            compaction_iter.Next();
          }
          is_compaction_iter_valid = true;
        }

        // 有切分过得tombstone的话，先将切分好的tombstone插入到SST中
        if (is_cut) {
          auto kv = cut_tombstone_rhs.Serialize();
          builder->Add(kv.first.Encode(), kv.second);
          sst_meta.UpdateBoundariesForRange(kv.first, cut_tombstone_rhs.SerializeEndKey(),
                                            cut_tombstone_rhs.seq_, internal_comparator);
          is_cut = false;
        }
        // 往SST中插入范围删除的墓碑数据
        // Three-way comparison.  Returns value:
        //   < 0 iff "a" < "b",
        //   == 0 iff "a" == "b",
        //   > 0 iff "a" > "b"
        for (; range_del_iter->Valid(); range_del_iter->Next()) {
          // tombstone.start_key_
          // tombstone.end_key_
          auto tombstone = range_del_iter->Tombstone();

//          std::cout << "sequence number: " << tombstone.seq_
//                    << " start key: " << tombstone.start_key_.ToString()
//                    << " end key: " << tombstone.end_key_.ToString()
//                    << std::endl;

          // 当compaction iterator遍历完以后(is_compaction_iter_valid为false的时候)，
          // 墓碑的数据不论是在哪个区间，都应该被插入到当前SST中。
          if (!is_compaction_iter_valid) {
            auto kv = tombstone.Serialize();
            builder->Add(kv.first.Encode(), kv.second);
            sst_meta.UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(),
                                              tombstone.seq_, internal_comparator);
            continue;
          }
          Slice tombstone_start_key = tombstone.start_key_;
          Slice tombstone_end_key = tombstone.end_key_;
          int compare_result = internal_comparator.user_comparator()->Compare(tombstone_start_key, compaction_iter_key);
          if (compare_result <= 0) {
            // SST的end_key大于tombstone的start_key的情况
            compare_result = internal_comparator.user_comparator()->Compare(compaction_iter_key, tombstone_end_key);
            if (compare_result < 0) {
              // tombstone需要切分的情况
              auto cut_tombstone_lhs = RangeTombstone(tombstone.start_key_, compaction_iter_key, tombstone.seq_);
              cut_tombstone_rhs = RangeTombstone(compaction_iter_key, tombstone.end_key_, tombstone.seq_);

              auto kv = cut_tombstone_lhs.Serialize();
              builder->Add(kv.first.Encode(), kv.second);
              sst_meta.UpdateBoundariesForRange(kv.first, cut_tombstone_lhs.SerializeEndKey(),
                                                cut_tombstone_lhs.seq_, internal_comparator);
              is_cut = true;
            } else {
              // tombstone不需要切分的情况
              // kv.first: InternalKey(start_key_, seq_, kTypeRangeDeletion)
              // kv.second: end_key_
              auto kv = tombstone.Serialize();
              builder->Add(kv.first.Encode(), kv.second);
              sst_meta.UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(),
                                                tombstone.seq_, internal_comparator);
            }
          } else {
            // tombstone的范围不在此SST范围里
            break;
          }
        }
      }
      // Finish and check for builder errors
      tp = builder->GetTableProperties();
      bool empty = builder->NumEntries() == 0 && tp.num_range_deletions == 0;
      s = compaction_iter.status();
      if (!s.ok() || empty) {
        builder->Abandon();
      } else {
        s = builder->Finish();
      }

      if (s.ok() && !empty) {
        uint64_t file_size = builder->FileSize();
        sst_meta.fd.file_size = file_size;
        sst_meta.marked_for_compaction = builder->NeedCompact();
        assert(sst_meta.fd.GetFileSize() > 0);
        tp = builder->GetTableProperties(); // refresh now that builder is finished
      }
      // builder使命已经完成
      delete builder;
      // Finish and check for file errors
      if (s.ok() && !empty) {
        StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
        s = file_writer->Sync(ioptions.use_fsync);
      }
      if (s.ok() && !empty) {
        // sst已经建立完毕
        s = file_writer->Close();
      }

      if (s.ok() && !empty) {
        // 检查此sst是否可用
        std::unique_ptr<InternalIterator> it(cfd->table_cache()->NewIterator(
            ReadOptions(), env_options, internal_comparator, sst_meta,
            nullptr /* range_del_agg */,
            mutable_cf_options.prefix_extractor.get(), nullptr,
            (internal_stats == nullptr) ? nullptr
                                        : internal_stats->GetFileReadHist(1),
            false /* for_compaction */, nullptr /* arena */,
            false /* skip_filter */, 1));
        s = it->status();
        if (s.ok() && mutable_cf_options.paranoid_file_checks) {
          for (it->SeekToFirst(); it->Valid(); it->Next()) {
          }
          s = it->status();
        }
      }

      // Check for input iterator errors
      if (!iter->status().ok()) {
        s = iter->status();
      }

      if (!s.ok() || sst_meta.fd.GetFileSize() == 0) {
        env->DeleteFile(fname);
      }

      // 输出到事件记录器和触发事件。
      // Output to event logger and fire events.
      EventHelpers::LogAndNotifyTableFileCreationFinished(
          event_logger, ioptions.listeners, dbname, column_family_name, fname,
          job_id, sst_meta.fd, tp, TableFileCreationReason::kFlush, s);

      if (!s.ok()) return s;

      // 添加Manifest信息
      edit.AddFile(1 /* level */, sst_meta.fd.GetNumber(), sst_meta.fd.GetPathId(),
                   sst_meta.fd.GetFileSize(), sst_meta.smallest, sst_meta.largest,
                   sst_meta.fd.smallest_seqno, sst_meta.fd.largest_seqno,
                   sst_meta.marked_for_compaction);

      metas.push_back(sst_meta);
    } // while 生成多个SST
  } // if 判断是否有数据
  return s;
}

/**
 * wangzekun01@inspur.com：
 * 内存存储区落盘到level1层
 *
 * @details 文件超过阈值、遇到level2的边界，文件会被切分
 * @param cfd 需要操作的列族
 *        thread_pri 通常是 Env::HIGH
 *        expected_file_size  期望文件的大致大小(仅kv数据量)
 *        boundary_keys 获取level2层的边界值，用于切分文件，此对象可由GetLevelsBoundaries生成
 * */
Status DBImpl::FlushL0CacheToSST(ColumnFamilyData *cfd, const MutableCFOptions &mutable_cf_options,
                                 JobContext *job_context, std::vector<SequenceNumber> &snapshot_seqs,
                                 SequenceNumber earliest_write_conflict_snapshot,
                                 SnapshotChecker *snapshot_checker, Env::Priority thread_pri,
                                 size_t expected_file_size, const std::vector<Slice> &boundary_keys,
                                 VersionEdit &edit, std::vector<FileMetaData> &metas) {
  Status s;
  mutex_.AssertHeld();
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_FLUSH_WRITE_L0);

  ROCKS_LOG_INFO(initial_db_options_.info_log,"[job_id %i] ------Start------", job_context->job_id);

  const uint64_t start_micros = env_->NowMicros();
  const uint64_t start_cpu_micros = env_->NowCPUNanos() / 1000;

  // 表示level0层文件大约的生命周期（用于构造sst文件用的）
  auto write_hint = cfd->CalculateSSTWriteHint(1);
  mutex_.Unlock();

  // 生成数据迭代器和范围删除迭代器
  std::shared_ptr<InternalIterator> iter = cfd->memoryarena()->NewMemIter(false);

  std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter
      (cfd->memoryarena()->NewRangeTombstoneIterator(kMaxSequenceNumber, cfd->internal_comparator()));

  s = BuildLevel1Tables(dbname_, env_, env_options_, iter, std::move(range_del_iter),
                        snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
                        &event_logger_, job_context->job_id, write_hint, expected_file_size,
                        versions_.get(), cfd, edit, metas,
                        boundary_keys);
  mutex_.Lock();

  // 统计信息
  InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
  stats.micros = env_->NowMicros() - start_micros;
  stats.cpu_micros = env_->NowCPUNanos() / 1000 - start_cpu_micros;

  for (const auto &meta : metas) {
    stats.bytes_written += meta.fd.GetFileSize();
  }

  RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
  cfd->internal_stats()->AddCompactionStats(1 /* level */, thread_pri, stats);
  cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
                                    stats.bytes_written);
  RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
  ThreadStatusUtil::IncreaseThreadOperationProperty(
      ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
  IOSTATS_RESET(bytes_written);
  return s;
}

Status FlushJob::WriteMemoryArena(std::vector<InternalIterator *> &mem_iters,
                                  std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>> &range_del_iters) {
  MemArena *memoryArena = cfd_->memoryarena();
  assert(memoryArena);

  for (auto &iter : mem_iters) {
    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
      Slice key(iter->key().data_, iter->key().size_);
      Slice value(iter->value().data_, iter->value().size_);
      bool insert = memoryArena->KVDataInsert(key, value);
      assert(insert);
    }
  }
  for (auto &range_del_iter : range_del_iters) {
    for (range_del_iter->SeekToFirst(); range_del_iter->Valid(); range_del_iter->Next()) {
      Slice key(range_del_iter->key().data_, range_del_iter->key().size_);
      Slice value(range_del_iter->value().data_, range_del_iter->value().size_);
      bool insert = memoryArena->RangeDeletionInsert(key, value);
      assert(insert);
    }
  }
  return Status::OK();
}

Status DBImpl::FlushMemoryArena(ColumnFamilyData *cfd,
                                const MutableCFOptions &mutable_cf_options,
                                JobContext *job_context, std::vector<SequenceNumber> &snapshot_seqs,
                                SequenceNumber earliest_write_conflict_snapshot,
                                SnapshotChecker *snapshot_checker) {
  Status s;
  mutex_.AssertHeld();
  std::vector<FileMetaData *> files = cfd->current()->storage_info()->LevelFiles(0);
  VersionEdit edit;
  std::vector<FileMetaData> metas;
  edit.SetColumnFamily(cfd->GetID());
  if (s.ok()) {
    s = FlushL0CacheToSST(cfd, mutable_cf_options, job_context,
                          snapshot_seqs, earliest_write_conflict_snapshot,
                          snapshot_checker, Env::HIGH, kExpectedFileSize,
                          GetLevelsBoundaries(cfd, 2), edit, metas);
    assert(s.ok());
  }
  for (auto file : files) {
    uint32_t path_id = file->fd.GetPathId();
    u_int64_t file_id = file->fd.GetNumber();
    std::string file_name = MakeTableFileName(cfd->ioptions()->cf_paths[path_id].path,
                                              file_id);
    s = env_->DeleteFile(file_name);
    assert(s.ok());
    edit.DeleteFile(0, file_id);
  }

  s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
                             &edit, &mutex_);

  auto sfm = dynamic_cast<SstFileManagerImpl *>(
            immutable_db_options_.sst_file_manager.get());
  if (sfm) {
    for (auto file : files) {
      uint32_t path_id = file->fd.GetPathId();
      u_int64_t file_id = file->fd.GetNumber();
      std::string file_name = MakeTableFileName(cfd->ioptions()->cf_paths[path_id].path,
                                                file_id);
      sfm->OnDeleteFile(file_name);
    }
  }

  // Notify sst_file_manager that a new file was added
  for (const auto &meta : metas) {
    std::string file_name = MakeTableFileName(
            cfd->ioptions()->cf_paths[0].path, meta.fd.GetNumber());
    sfm->OnAddFile(file_name);
    if (sfm->IsMaxAllowedSpaceReached()) {
      Status new_bg_error =
              Status::SpaceLimit("Max allowed space was reached");
      TEST_SYNC_POINT_CALLBACK(
              "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
              &new_bg_error);
      error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
    }
  }
  MemArena* old_memory = cfd->memoryarena();
  cfd->SetMemoryarena(new MemArena(cfd->internal_comparator(), *cfd->ioptions()));
  cfd->memoryarena()->Ref();
  old_memory->Unref();
  return s;
}

}